use crate::{Error, RefConfig};
use base64::prelude::*;
use bytes::Bytes;
use http::{header::ToStrError, HeaderMap, HeaderValue, StatusCode};
use lambda_runtime_api_client::body::Body;
use serde::{Deserialize, Serialize};
use std::{
    collections::HashMap,
    fmt::Debug,
    time::{Duration, SystemTime},
};
use tokio_stream::Stream;

/// Client context sent by the AWS Mobile SDK.
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct ClientContext {
    /// Information about the mobile application invoking the function.
    #[serde(default)]
    pub client: ClientApplication,
    /// Custom properties attached to the mobile event context.
    #[serde(default)]
    pub custom: HashMap<String, String>,
    /// Environment settings from the mobile client.
    #[serde(default)]
    pub environment: HashMap<String, String>,
}

/// AWS Mobile SDK client fields.
#[derive(Serialize, Deserialize, Default, Clone, Debug, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ClientApplication {
    /// The mobile app installation id
    #[serde(alias = "installation_id")]
    pub installation_id: String,
    /// The app title for the mobile app as registered with AWS' mobile services.
    #[serde(alias = "app_title")]
    pub app_title: String,
    /// The version name of the application as registered with AWS' mobile services.
    #[serde(alias = "app_version_name")]
    pub app_version_name: String,
    /// The app version code.
    #[serde(alias = "app_version_code")]
    pub app_version_code: String,
    /// The package name for the mobile application invoking the function
    #[serde(alias = "app_package_name")]
    pub app_package_name: String,
}

/// Cognito identity information sent with the event
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct CognitoIdentity {
    /// The unique identity id for the Cognito credentials invoking the function.
    #[serde(alias = "cognitoIdentityId", alias = "identity_id")]
    pub identity_id: String,
    /// The identity pool id the caller is "registered" with.
    #[serde(alias = "cognitoIdentityPoolId", alias = "identity_pool_id")]
    pub identity_pool_id: String,
}

/// The Lambda function execution context. The values in this struct
/// are populated using the [Lambda environment variables](https://docs.aws.amazon.com/lambda/latest/dg/current-supported-versions.html)
/// and [the headers returned by the poll request to the Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-next).
#[non_exhaustive]
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct Context {
    /// The AWS request ID generated by the Lambda service.
    pub request_id: String,
    /// The execution deadline for the current invocation in milliseconds.
    pub deadline: u64,
    /// The ARN of the Lambda function being invoked.
    pub invoked_function_arn: String,
    /// The X-Ray trace ID for the current invocation.
    pub xray_trace_id: Option<String>,
    /// The client context object sent by the AWS mobile SDK. This field is
    /// empty unless the function is invoked using an AWS mobile SDK.
    pub client_context: Option<ClientContext>,
    /// The Cognito identity that invoked the function. This field is empty
    /// unless the invocation request to the Lambda APIs was made using AWS
    /// credentials issues by Amazon Cognito Identity Pools.
    pub identity: Option<CognitoIdentity>,
    /// Lambda function configuration from the local environment variables.
    /// Includes information such as the function name, memory allocation,
    /// version, and log streams.
    pub env_config: RefConfig,
}

impl Default for Context {
    fn default() -> Context {
        Context {
            request_id: "".to_owned(),
            deadline: 0,
            invoked_function_arn: "".to_owned(),
            xray_trace_id: None,
            client_context: None,
            identity: None,
            env_config: std::sync::Arc::new(crate::Config::default()),
        }
    }
}

impl Context {
    /// Create a new [Context] struct based on the function configuration
    /// and the incoming request data.
    pub fn new(request_id: &str, env_config: RefConfig, headers: &HeaderMap) -> Result<Self, Error> {
        let client_context: Option<ClientContext> = if let Some(value) = headers.get("lambda-runtime-client-context") {
            serde_json::from_str(value.to_str()?)?
        } else {
            None
        };

        let identity: Option<CognitoIdentity> = if let Some(value) = headers.get("lambda-runtime-cognito-identity") {
            serde_json::from_str(value.to_str()?)?
        } else {
            None
        };

        let ctx = Context {
            request_id: request_id.to_owned(),
            deadline: headers
                .get("lambda-runtime-deadline-ms")
                .expect("missing lambda-runtime-deadline-ms header")
                .to_str()?
                .parse::<u64>()?,
            invoked_function_arn: headers
                .get("lambda-runtime-invoked-function-arn")
                .unwrap_or(&HeaderValue::from_static(
                    "No header lambda-runtime-invoked-function-arn found.",
                ))
                .to_str()?
                .to_owned(),
            xray_trace_id: headers
                .get("lambda-runtime-trace-id")
                .map(|v| String::from_utf8_lossy(v.as_bytes()).to_string()),
            client_context,
            identity,
            env_config,
        };

        Ok(ctx)
    }

    /// The execution deadline for the current invocation.
    pub fn deadline(&self) -> SystemTime {
        SystemTime::UNIX_EPOCH + Duration::from_millis(self.deadline)
    }
}

/// Extract the invocation request id from the incoming request.
pub(crate) fn invoke_request_id(headers: &HeaderMap) -> Result<&str, ToStrError> {
    headers
        .get("lambda-runtime-aws-request-id")
        .expect("missing lambda-runtime-aws-request-id header")
        .to_str()
}

/// Incoming Lambda request containing the event payload and context.
#[derive(Clone, Debug)]
pub struct LambdaEvent<T> {
    /// Event payload.
    pub payload: T,
    /// Invocation context.
    pub context: Context,
}

impl<T> LambdaEvent<T> {
    /// Creates a new Lambda request
    pub fn new(payload: T, context: Context) -> Self {
        Self { payload, context }
    }

    /// Split the Lambda event into its payload and context.
    pub fn into_parts(self) -> (T, Context) {
        (self.payload, self.context)
    }
}

/// Metadata prelude for a stream response.
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct MetadataPrelude {
    #[serde(with = "http_serde::status_code")]
    /// The HTTP status code.
    pub status_code: StatusCode,
    #[serde(with = "http_serde::header_map")]
    /// The HTTP headers.
    pub headers: HeaderMap,
    /// The HTTP cookies.
    pub cookies: Vec<String>,
}

pub trait ToStreamErrorTrailer {
    /// Convert the hyper error into a stream error trailer.
    fn to_tailer(&self) -> String;
}

impl ToStreamErrorTrailer for Error {
    fn to_tailer(&self) -> String {
        format!(
            "Lambda-Runtime-Function-Error-Type: Runtime.StreamError\r\nLambda-Runtime-Function-Error-Body: {}\r\n",
            BASE64_STANDARD.encode(self.to_string())
        )
    }
}

/// A streaming response that contains the metadata prelude and the stream of bytes that will be
/// sent to the client.
#[derive(Debug)]
pub struct StreamResponse<S> {
    ///  The metadata prelude.
    pub metadata_prelude: MetadataPrelude,
    /// The stream of bytes that will be sent to the client.
    pub stream: S,
}

/// An enum representing the response of a function that can return either a buffered
/// response of type `B` or a streaming response of type `S`.
pub enum FunctionResponse<B, S> {
    /// A buffered response containing the entire payload of the response. This is useful
    /// for responses that can be processed quickly and have a relatively small payload size(<= 6MB).
    BufferedResponse(B),
    /// A streaming response that delivers the payload incrementally. This is useful for
    /// large payloads(> 6MB) or responses that take a long time to generate. The client can start
    /// processing the response as soon as the first chunk is available, without waiting
    /// for the entire payload to be generated.
    StreamingResponse(StreamResponse<S>),
}

/// a trait that can be implemented for any type that can be converted into a FunctionResponse.
/// This allows us to use the `into` method to convert a type into a FunctionResponse.
pub trait IntoFunctionResponse<B, S> {
    /// Convert the type into a FunctionResponse.
    fn into_response(self) -> FunctionResponse<B, S>;
}

impl<B, S> IntoFunctionResponse<B, S> for FunctionResponse<B, S> {
    fn into_response(self) -> FunctionResponse<B, S> {
        self
    }
}

impl<B> IntoFunctionResponse<B, Body> for B
where
    B: Serialize,
{
    fn into_response(self) -> FunctionResponse<B, Body> {
        FunctionResponse::BufferedResponse(self)
    }
}

impl<S, D, E> IntoFunctionResponse<(), S> for StreamResponse<S>
where
    S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
    D: Into<Bytes> + Send,
    E: Into<Error> + Send + Debug,
{
    fn into_response(self) -> FunctionResponse<(), S> {
        FunctionResponse::StreamingResponse(self)
    }
}

impl<S, D, E> From<S> for StreamResponse<S>
where
    S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
    D: Into<Bytes> + Send,
    E: Into<Error> + Send + Debug,
{
    fn from(value: S) -> Self {
        StreamResponse {
            metadata_prelude: Default::default(),
            stream: value,
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use crate::Config;
    use std::sync::Arc;

    #[test]
    fn context_with_expected_values_and_types_resolves() {
        let config = Arc::new(Config::default());

        let mut headers = HeaderMap::new();
        headers.insert("lambda-runtime-aws-request-id", HeaderValue::from_static("my-id"));
        headers.insert("lambda-runtime-deadline-ms", HeaderValue::from_static("123"));
        headers.insert(
            "lambda-runtime-invoked-function-arn",
            HeaderValue::from_static("arn::myarn"),
        );
        headers.insert("lambda-runtime-trace-id", HeaderValue::from_static("arn::myarn"));
        let tried = Context::new("id", config, &headers);
        assert!(tried.is_ok());
    }

    #[test]
    fn context_with_certain_missing_headers_still_resolves() {
        let config = Arc::new(Config::default());

        let mut headers = HeaderMap::new();
        headers.insert("lambda-runtime-aws-request-id", HeaderValue::from_static("my-id"));
        headers.insert("lambda-runtime-deadline-ms", HeaderValue::from_static("123"));
        let tried = Context::new("id", config, &headers);
        assert!(tried.is_ok());
    }

    #[test]
    fn context_with_client_context_resolves() {
        let mut custom = HashMap::new();
        custom.insert("key".to_string(), "value".to_string());
        let mut environment = HashMap::new();
        environment.insert("key".to_string(), "value".to_string());
        let client_context = ClientContext {
            client: ClientApplication {
                installation_id: String::new(),
                app_title: String::new(),
                app_version_name: String::new(),
                app_version_code: String::new(),
                app_package_name: String::new(),
            },
            custom,
            environment,
        };
        let client_context_str = serde_json::to_string(&client_context).unwrap();
        let mut headers = HeaderMap::new();
        headers.insert("lambda-runtime-aws-request-id", HeaderValue::from_static("my-id"));
        headers.insert("lambda-runtime-deadline-ms", HeaderValue::from_static("123"));
        headers.insert(
            "lambda-runtime-client-context",
            HeaderValue::from_str(&client_context_str).unwrap(),
        );

        let config = Arc::new(Config::default());
        let tried = Context::new("id", config, &headers);
        assert!(tried.is_ok());
        let tried = tried.unwrap();
        assert!(tried.client_context.is_some());
        assert_eq!(tried.client_context.unwrap(), client_context);
    }

    #[test]
    fn context_with_empty_client_context_resolves() {
        let config = Arc::new(Config::default());
        let mut headers = HeaderMap::new();
        headers.insert("lambda-runtime-aws-request-id", HeaderValue::from_static("my-id"));
        headers.insert("lambda-runtime-deadline-ms", HeaderValue::from_static("123"));
        headers.insert("lambda-runtime-client-context", HeaderValue::from_static("{}"));
        let tried = Context::new("id", config, &headers);
        assert!(tried.is_ok());
        assert!(tried.unwrap().client_context.is_some());
    }

    #[test]
    fn context_with_identity_resolves() {
        let config = Arc::new(Config::default());

        let cognito_identity = CognitoIdentity {
            identity_id: String::new(),
            identity_pool_id: String::new(),
        };
        let cognito_identity_str = serde_json::to_string(&cognito_identity).unwrap();
        let mut headers = HeaderMap::new();
        headers.insert("lambda-runtime-aws-request-id", HeaderValue::from_static("my-id"));
        headers.insert("lambda-runtime-deadline-ms", HeaderValue::from_static("123"));
        headers.insert(
            "lambda-runtime-cognito-identity",
            HeaderValue::from_str(&cognito_identity_str).unwrap(),
        );
        let tried = Context::new("id", config, &headers);
        assert!(tried.is_ok());
        let tried = tried.unwrap();
        assert!(tried.identity.is_some());
        assert_eq!(tried.identity.unwrap(), cognito_identity);
    }

    #[test]
    fn context_with_bad_deadline_type_is_err() {
        let config = Arc::new(Config::default());

        let mut headers = HeaderMap::new();
        headers.insert("lambda-runtime-aws-request-id", HeaderValue::from_static("my-id"));
        headers.insert(
            "lambda-runtime-deadline-ms",
            HeaderValue::from_static("BAD-Type,not <u64>"),
        );
        headers.insert(
            "lambda-runtime-invoked-function-arn",
            HeaderValue::from_static("arn::myarn"),
        );
        headers.insert("lambda-runtime-trace-id", HeaderValue::from_static("arn::myarn"));
        let tried = Context::new("id", config, &headers);
        assert!(tried.is_err());
    }

    #[test]
    fn context_with_bad_client_context_is_err() {
        let config = Arc::new(Config::default());

        let mut headers = HeaderMap::new();
        headers.insert("lambda-runtime-aws-request-id", HeaderValue::from_static("my-id"));
        headers.insert("lambda-runtime-deadline-ms", HeaderValue::from_static("123"));
        headers.insert(
            "lambda-runtime-client-context",
            HeaderValue::from_static("BAD-Type,not JSON"),
        );
        let tried = Context::new("id", config, &headers);
        assert!(tried.is_err());
    }

    #[test]
    fn context_with_empty_identity_is_err() {
        let config = Arc::new(Config::default());

        let mut headers = HeaderMap::new();
        headers.insert("lambda-runtime-aws-request-id", HeaderValue::from_static("my-id"));
        headers.insert("lambda-runtime-deadline-ms", HeaderValue::from_static("123"));
        headers.insert("lambda-runtime-cognito-identity", HeaderValue::from_static("{}"));
        let tried = Context::new("id", config, &headers);
        assert!(tried.is_err());
    }

    #[test]
    fn context_with_bad_identity_is_err() {
        let config = Arc::new(Config::default());

        let mut headers = HeaderMap::new();
        headers.insert("lambda-runtime-aws-request-id", HeaderValue::from_static("my-id"));
        headers.insert("lambda-runtime-deadline-ms", HeaderValue::from_static("123"));
        headers.insert(
            "lambda-runtime-cognito-identity",
            HeaderValue::from_static("BAD-Type,not JSON"),
        );
        let tried = Context::new("id", config, &headers);
        assert!(tried.is_err());
    }

    #[test]
    #[should_panic]
    fn context_with_missing_deadline_should_panic() {
        let config = Arc::new(Config::default());

        let mut headers = HeaderMap::new();
        headers.insert("lambda-runtime-aws-request-id", HeaderValue::from_static("my-id"));
        headers.insert(
            "lambda-runtime-invoked-function-arn",
            HeaderValue::from_static("arn::myarn"),
        );
        headers.insert("lambda-runtime-trace-id", HeaderValue::from_static("arn::myarn"));
        let _ = Context::new("id", config, &headers);
    }

    #[test]
    fn invoke_request_id_should_not_panic() {
        let mut headers = HeaderMap::new();
        headers.insert("lambda-runtime-aws-request-id", HeaderValue::from_static("my-id"));
        headers.insert("lambda-runtime-deadline-ms", HeaderValue::from_static("123"));
        headers.insert(
            "lambda-runtime-invoked-function-arn",
            HeaderValue::from_static("arn::myarn"),
        );
        headers.insert("lambda-runtime-trace-id", HeaderValue::from_static("arn::myarn"));

        let _ = invoke_request_id(&headers);
    }

    #[test]
    #[should_panic]
    fn invoke_request_id_should_panic() {
        let mut headers = HeaderMap::new();
        headers.insert("lambda-runtime-deadline-ms", HeaderValue::from_static("123"));
        headers.insert(
            "lambda-runtime-invoked-function-arn",
            HeaderValue::from_static("arn::myarn"),
        );
        headers.insert("lambda-runtime-trace-id", HeaderValue::from_static("arn::myarn"));

        let _ = invoke_request_id(&headers);
    }

    #[test]
    fn serde_metadata_prelude() {
        let metadata_prelude = MetadataPrelude {
            status_code: StatusCode::OK,
            headers: {
                let mut headers = HeaderMap::new();
                headers.insert("key", "val".parse().unwrap());
                headers
            },
            cookies: vec!["cookie".to_string()],
        };

        let serialized = serde_json::to_string(&metadata_prelude).unwrap();
        let deserialized: MetadataPrelude = serde_json::from_str(&serialized).unwrap();

        assert_eq!(metadata_prelude, deserialized);
    }
}
